package pub.ryan.dw.pub.ryan.dw.idmp

import org.apache.commons.lang3.StringUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import pub.ryan.commons.util.SparkUtil

import scala.tools.scalap.scalax.util.StringUtil

//id-mapping图计算api
/*
13487545430,张三,wx_zs,20000
13487567565,小张,wx_zs,30000
,张三,wx_xs,50000
13927545201,李四,wx_ls,10000
13927545201,小四,wx_xs,21000
13027545488,小四四,wx_xs,18000
 */
object Demo {
  def main(args: Array[String]): Unit = {
    val sparkSession = SparkUtil.getSparkSession(this.getClass.getSimpleName)
    val ds: Dataset[String] = sparkSession.read.textFile("data\\graphx\\input\\dehua.txt")
    //加入隐式转换
    import sparkSession.implicits._

    //构造一个点RDD
    val vertices: RDD[(Long, String)] = ds.rdd.flatMap(line => {
      val fields: Array[String] = line.split(",")

      //在spark图计算api中，点需要表示成一个tuple-->(点的唯一标识，点的数据）
      for (ele <- fields if StringUtils.isNotBlank(ele))
        yield (ele.hashCode.toLong, ele)
    })

    //构造一个边RDD
    //spark graphx对边的一个描述：Edge(起始点，目标点，边数据）
    val edges = ds.rdd.flatMap(line => {
      val fields: Array[String] = line.split(",")
      //为了避免越界，长度直接减2
      for (i <- 0 to fields.length - 2 if StringUtils.isNotBlank(fields(i))) yield Edge(fields(i).hashCode.toLong, fields(i + 1).hashCode.toLong, "")
    })


    //用点集合和边集合构造一张图，求最大联通子图
    val graph: Graph[String, String] = Graph(vertices, edges)
    val graph_new: Graph[VertexId, String] = graph.connectedComponents()
    //得到一个新的点数据(点id,点数据)，当数据是同一个图时，它的点数据是一样的
    //即同一组的（0，0）（1，0）（2，0）（3，0） 另一组的（4，1)(5,1)(6,1) 更多组（m-2,n)（m-1,n)（m,n)（m+1,n)
    val vertices_new: VertexRDD[VertexId] = graph_new.vertices

    //将得到的映射关系rdd，收集到Driver端，然后作为变量广播出去
    val idToMap: collection.Map[VertexId, VertexId] = vertices_new.collectAsMap()
    val bc: Broadcast[collection.Map[VertexId, VertexId]] = sparkSession.sparkContext.broadcast(idToMap)


    //利用这个映射关系结果，来加工我们的原始数据
    val res = ds.map(line => {
      val bc_map: collection.Map[VertexId, VertexId] = bc.value
      val name: String = line.split(",").filter(StringUtils.isNotBlank(_))(0)
      val gid: VertexId = bc_map.get(name.hashCode.toLong).get
      gid + "," + line
    })

    res.show(10, false)

    sparkSession.close()
  }

}
